A skewed dataset is defined by a dataset that has a class imbalance, this leads to poor or failing spark jobs that often get a OOM (out of memory) error.
When performing a join onto a skewed dataset it's usually the case where there is an imbalance on the key(s) on which the join is performed on. This results in a majority of the data falls onto a single partition, which will take longer to complete than the other partitions.
Some hints to detect skewness is:
- The key(s) consist mainly ofnullvalues which fall onto a single partition.
- There is a subset of values for the key(s) that makeup the high percentage of the total keys which fall onto a single partition.
We go through both these cases and see how we can combat it.
Library Imports
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
Template
spark = (
    SparkSession.builder
    .master("local")
    .appName("Exploring Joins")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
sc = spark.sparkContext
Situation 2: High Frequency Keys
Inital Datasets
customers = spark.createDataFrame([
    (1, "John"), 
    (2, "Bob"),
], ["customer_id", "first_name"])
customers.toPandas()
| customer_id | first_name | |
|---|---|---|
| 0 | 1 | John | 
| 1 | 2 | Bob | 
orders = spark.createDataFrame([
    (i, 1 if i < 95 else 2, "order #{}".format(i)) for i in range(100) 
], ["id", "customer_id", "order_name"])
orders.toPandas().tail(6)
| id | customer_id | order_name | |
|---|---|---|---|
| 94 | 94 | 1 | order #94 | 
| 95 | 95 | 2 | order #95 | 
| 96 | 96 | 2 | order #96 | 
| 97 | 97 | 2 | order #97 | 
| 98 | 98 | 2 | order #98 | 
| 99 | 99 | 2 | order #99 | 
Option 1: Inner Join
df = customers.join(orders, "customer_id")
df.toPandas().tail(10)
| customer_id | first_name | id | order_name | |
|---|---|---|---|---|
| 90 | 1 | John | 90 | order #90 | 
| 91 | 1 | John | 91 | order #91 | 
| 92 | 1 | John | 92 | order #92 | 
| 93 | 1 | John | 93 | order #93 | 
| 94 | 1 | John | 94 | order #94 | 
| 95 | 2 | Bob | 95 | order #95 | 
| 96 | 2 | Bob | 96 | order #96 | 
| 97 | 2 | Bob | 97 | order #97 | 
| 98 | 2 | Bob | 98 | order #98 | 
| 99 | 2 | Bob | 99 | order #99 | 
df.explain()
== Physical Plan ==
*(5) Project [customer_id#122L, first_name#123, id#126L, order_name#128]
+- *(5) SortMergeJoin [customer_id#122L], [customer_id#127L], Inner
   :- *(2) Sort [customer_id#122L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(customer_id#122L, 200)
   :     +- *(1) Filter isnotnull(customer_id#122L)
   :        +- Scan ExistingRDD[customer_id#122L,first_name#123]
   +- *(4) Sort [customer_id#127L ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(customer_id#127L, 200)
         +- *(3) Filter isnotnull(customer_id#127L)
            +- Scan ExistingRDD[id#126L,customer_id#127L,order_name#128]
What Happened:
- We want to find what orders eachcustomermade, so we will bejoining thecustomers table to theorders table.
- When performing the join, we perform a hashpartitioningoncustomer_id.
- From our data creation, this means 95% of the data landed onto a single partition.
Results:
- Similar to the Null Skewcase, this means that single task/partition will take a lot longer than the others, and most likely erroring out.